package cn.jdemo.juc.core.util.async.executor;

import cn.jdemo.juc.core.util.async.wrapper.WorkerWrapper;

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;

/**
 *
 * @description
 * @date 2020/12/1
 */
public class Async {

    public static final ExecutorService es = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,
            1024, 5000, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>());

    public static boolean beginWork(long timeout,ExecutorService pool, List<WorkerWrapper> workerWrappers) throws InterruptedException,ExecutionException{
        if(workerWrappers == null || workerWrappers.size() == 0) {
            return false;
        }

        //(目标:本次所有worker)
        // 定义一个map，存放所有的wrapper，key为wrapper的唯一id，value是该wrapper，可以从value中获取wrapper的result
        Map<String, WorkerWrapper> forParamUseWrappers = new ConcurrentHashMap<>();

        CompletableFuture[] futures = new CompletableFuture[workerWrappers.size()];
        for (int i = 0; i < workerWrappers.size(); i++) {
            WorkerWrapper wrapper = workerWrappers.get(i);
            futures[i] = CompletableFuture.runAsync(() -> wrapper.work(pool, timeout, forParamUseWrappers), pool);
        }
        try {
            // 同步阻塞,直到所有都完成,或失败
            CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS);
            return true;
        } catch (TimeoutException e) {
            e.printStackTrace();
            Set<WorkerWrapper> set = new HashSet<>();
            totalWorkers(workerWrappers, set);
            for (WorkerWrapper wrapper : set) {
                wrapper.stopNow();
            }
            return false;
        } /*catch (InterruptedException | ExecutionException e){
        }*/
    }

    private static void totalWorkers(List<WorkerWrapper> workerWrappers, Set<WorkerWrapper> set) {
        set.addAll(workerWrappers);
        for (WorkerWrapper wrapper : workerWrappers) {
            if (wrapper.getNextWrappers() == null) {
                continue;
            }
            List<WorkerWrapper> wrappers = wrapper.getNextWrappers();
            totalWorkers(wrappers, set);
        }
    }

    public static boolean beginWork(long timeout, WorkerWrapper... workerWrapper) throws InterruptedException,ExecutionException{
        return beginWork(timeout, es, workerWrapper);
    }
    public static boolean beginWork(long timeout,ExecutorService pool, WorkerWrapper... workerWrapper) throws InterruptedException,ExecutionException{
        if(workerWrapper == null || workerWrapper.length == 0) {
            return false;
        }
        List<WorkerWrapper> workerWrappers =  Arrays.stream(workerWrapper).collect(Collectors.toList());
        return beginWork(timeout, pool, workerWrappers);
    }
}
